package org.totoro.context;

import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.totoro.context.task.DataflowTask;
import org.totoro.context.task.SimpleTask;

import javax.annotation.Resource;

/**
 * @author YHL
 * @version V1.0
 * @Description: 注册中心
 * @date 2018-03-28
 */
@Configuration
public class ElasticJobConfig {


    @Resource
    private CoordinatorRegistryCenter regCenter;


    @Bean(initMethod = "init")
    public CoordinatorRegistryCenter createRegistryCenter(ElasticJobProperties elasticJobProperties) {

        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(
                elasticJobProperties.getZkAddr(),
                elasticJobProperties.getName());

        // zk 重试次数
        zookeeperConfiguration.setMaxRetries(10);
        // zk 连接超时次数
        zookeeperConfiguration.setConnectionTimeoutMilliseconds(1000 * 60);


        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);


        return regCenter;
    }

    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(SimpleTask simpleTask) {
        return new SpringJobScheduler(simpleTask, regCenter, getSimpleJobConfiguration(simpleTask.getClass(), "* 0/2 * * * ?", 1,
                "0=a001,1=b001,2=c001"));
    }

    @Bean(initMethod = "init")
    public JobScheduler dataflowScheduler(DataflowTask dataflowTask) {

        Class<? extends DataflowTask> dataflowTaskClass = dataflowTask.getClass();

        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(dataflowTaskClass.getName(), "0 0/1 * * * ?", 5).build();

        DataflowJobConfiguration dataflowJobConfiguration = new DataflowJobConfiguration(jobCoreConfiguration, dataflowTaskClass.getName(), true);

        LiteJobConfiguration build = LiteJobConfiguration.newBuilder(dataflowJobConfiguration).build();

        return new SpringJobScheduler(dataflowTask, regCenter, build);
    }


    /**
     * @param jobClass               实际运行任务的class
     * @param cron                   调度表达式
     * @param shardingTotalCount     任务并发执行数大小
     * @param shardingItemParameters 设置任务的参数 例如 0=A ，表示0的这台主机，执行的时候会在分片中拿到 A,主要作用是通过参数控制，任务的行为
     * @return
     */
    public LiteJobConfiguration getSimpleJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {


        /**
         * 设置任务的调度配置
         */
        JobCoreConfiguration build = JobCoreConfiguration.newBuilder(jobClass.getName() + "-", cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build();
        /**
         * 构建job配置
         */
        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(build, jobClass.getCanonicalName());
        LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).build();

        return liteJobConfiguration;

        /**
         * 官方配置例子
         */
//        return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(
//                jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();
    }

    public void addTask(SimpleJob task, String cron) {
        new SpringJobScheduler(task, regCenter, getSimpleJobConfiguration(task.getClass(), cron, 1,
                "0=a001,1=b001,2=c001"), new ElasticJobListener() {
            @Override
            public void beforeJobExecuted(ShardingContexts shardingContexts) {

                System.out.println("beforeJobExecuted");
            }

            @Override
            public void afterJobExecuted(ShardingContexts shardingContexts) {

                System.out.println("afterJobExecuted");

            }
        }).init();
    }


}
